package com.flink.java.demo.partition;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 重新分区 算子
 */
public class TransformTest1_Partition2 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(5);

        // 从文件读取数据
        DataStream<String> dataStream = env.socketTextStream("192.168.137.101", 7777);

//        dataStream.print("input");

        // 1. huffle随机分区: random.nextInt(下游算子并行度)
        DataStream<String> shuffleStream = dataStream.shuffle();
//        shuffleStream.print("shuffle");

        // 2.  keyby: 按指定key去发送，相同key发往同一个子任务
//        dataStream.keyBy("id").print("keyBy");

        // 3. global: 所有数据全部发送到下游的第一个分区
        dataStream.global().print("global");

        // 4. rebalance轮询：nextChannelToSendTo = (nextChannelToSendTo + 1) % 下游算子并行度
        // 如果是 数据源倾斜的场景， source后，调用rebalance，就可以解决 数据源的 数据倾斜
//        socketDS.rebalance().print();

        //5. rescale缩放： 实现轮询， 局部组队，比rebalance更高效
//        socketDS.rescale().print();


        // 6.broadcast 广播：  发送给下游所有的子任务
//        socketDS.broadcast().print();

//        7. one-to-one: Forward分区器

        //8. 自定义

        // 总结： Flink提供了 7种分区器+ 1种自定义

        env.execute();
    }
}
